
// Copyright (c) WanSheng Intelligent Corp. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.


#include <atomic>
#include <stdlib.h>
#include <string>
#include <map>
#include<stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "duration_dist.h"




CDistCounter::CDistCounter(const char * name, unsigned int period, 
    unsigned int quick_count_base , unsigned int quick_count_size ):
        m_name(name),
        m_period(period),
        m_alarmed(false),
        m_alarm_threshold(0),
        m_over_cnt(0),
        m_total(0),
        m_alarm_hander(NULL),
        m_quick_counts(NULL),
        m_quick_count_base(quick_count_base),
        m_missing_counts(0),
        m_atomic_lock(NULL)
{
    m_stats_start = time(NULL);
    m_atomic_lock = new std::atomic_flag();
    pthread_mutex_init(&m_condition_mutex, NULL);
    if(quick_count_size)
    {
        m_quick_counts = (unsigned int*)malloc(quick_count_size*sizeof(m_quick_counts[0]));
        if(m_quick_counts)
        {
            memset(m_quick_counts, 0, quick_count_size*sizeof(m_quick_counts[0]));
            m_quick_count_size = quick_count_size;
        }
        else
        {
            printf("set_quick_counts: malloc fail\n");
            m_quick_count_size = 0;
        }        
    }
}

CDistCounter::~CDistCounter()
{
    m_durations.clear();
    pthread_mutex_destroy(&m_condition_mutex);
    std::atomic_flag * flag = (std::atomic_flag *) m_atomic_lock;
    delete flag;
    if(m_quick_counts) free(m_quick_counts);
}


bool CDistCounter::lock(bool block_wait)
{
    //pthread_mutex_lock(&m_condition_mutex);

    std::atomic_flag  * flag = (std::atomic_flag *) m_atomic_lock;
    while (flag->test_and_set()) 
    {
        if(block_wait)
            usleep(1);
        else
            return false;
    }
    return true;
}

void CDistCounter::unlock()
{
    //pthread_mutex_unlock(&m_condition_mutex);

    std::atomic_flag  * flag = (std::atomic_flag *) m_atomic_lock;
    flag->clear();
}



void CDistCounter::counting(unsigned int duration)
{
    unsigned int key = duration /m_period;
    
    // we may miss a few counts during reporting to log file.
    // but we definfitely don't want to block the caller.
    if(!lock(false)) 
    {
        m_missing_counts ++;
        return;
    }

    m_total ++;

    if(m_quick_counts && key >= m_quick_count_base && key < (m_quick_count_base+m_quick_count_size))
    {
        m_quick_counts[key-m_quick_count_base] ++;
    }
    else
    {
        std::map<unsigned int, unsigned int>::iterator it;
        it = m_durations.find(key);
        if(it == m_durations.end())
            m_durations[key] = 1;
        else
            m_durations[key] ++;
    }

   unlock();

    if (m_alarm_threshold && duration >= m_alarm_threshold)
    {
        m_over_cnt ++;
        m_alarmed = 1;
        if(m_alarm_hander)
        {
            m_alarm_hander(this);
        }
    }
}

void CDistCounter::reset()
{
    m_over_cnt = 0;
    m_alarmed = false;
    m_stats_start = time(NULL);
    m_missing_counts = 0;
    m_total = 0;

    lock();
    m_durations.clear();
    if(m_quick_counts)
        memset(m_quick_counts, 0, m_quick_count_size*sizeof(m_quick_counts[0]));
    unlock();
}

char * CDistCounter::get_report(FILE* stream)
{
    int cnt = 0;
    struct tm * timeinfo;
    std::map<unsigned int, unsigned int>::iterator it;
    time_t now;

    for(unsigned int i=0;i<m_quick_count_size;i++)
    {
        if(m_quick_counts[i]) cnt++;
    }

    int size = (cnt+m_durations.size()) * 48 + 1024;
    if(size > 16*1000*1000) size = 16*1000*1000;
    char * buffer = (char *) malloc(size);
    if(buffer == NULL)
    {
        printf("CDistCounter::get_report, malloc %d fail.\n", size);
        return NULL;
    }

    timeinfo = localtime (&m_stats_start);

    char * pos = buffer;
    int n = snprintf(pos, size,
            "\n[%s]\n Distribution range: %u\nStart time: %02d-%02d %02d:%02d:%2d\nAlarms = %d\nTotal num = %u, Missing counts=%u\n",
            m_name.c_str(),
            m_period,
            timeinfo->tm_mon,
            timeinfo->tm_mday,
            timeinfo->tm_hour,
            timeinfo->tm_min,
            timeinfo->tm_sec,
            m_alarmed,
            m_total,
            m_missing_counts);

    if(stream) fputs(pos, stream);

    size -= n;
    pos += n;

    time(&now);
    timeinfo = localtime (&now);

    n = snprintf(pos, size, "reporting time: %02d-%02d %02d:%02d:%02d\n",
            timeinfo->tm_mon,
            timeinfo->tm_mday,
            timeinfo->tm_hour,
            timeinfo->tm_min,
            timeinfo->tm_sec);
    if(stream) fputs(pos, stream);
    size -= n;
    pos += n;

    lock();
    for(unsigned int i=0; i< m_quick_count_size; i++)
    {
        if(m_quick_counts[i])
        {
            n = snprintf(pos, size, "[%u - %u]: %u\n",
                    (m_quick_count_base +i) * m_period,
                    (m_quick_count_base +i+1) * m_period,
                    m_quick_counts[i]);

            if(n<= 0 || n>= size)
            break;

            if(stream) fputs(pos, stream);

            size -= n;
            pos += n;
        }
    }

    for (it = m_durations.begin(); it != m_durations.end(); it++)
    {
        n = snprintf(pos, size, "[%u - %u]: %u\n",
                it->first * m_period,
                (it->first+1) * m_period,
                it->second);

        if(n<= 0 || n>= size)
         break;
        
        if(stream) fputs(pos, stream);
        size -= n;
        pos += n;
    }
    unlock();
    return buffer;
}

bool CDistCounter::check(int * ms_to_expiry, unsigned int internval)
{
    time_t now = time(NULL);

    if((now - m_stats_start) >= internval)
    {
        return true;
    }
    else
    {
        *ms_to_expiry = (internval - (now - m_stats_start));
        return false;
    }
}

//
//   *********** class CDistManager   ***********
//

class CDistManager
{
public:
    CDistManager(const char * report_path = NULL);
    ~CDistManager();
    int * m_thread_quit;
    bool m_running;
    std::map<std::string, void *> m_counters;
    pthread_mutex_t m_condition_mutex;
    std::string m_report_path;
    FILE * log_fp;

    // report internval in seconds
    unsigned int m_report_interval;

    CDistCounter * find_counter(const char * name);
    void add_counter(CDistCounter *, const char * name = NULL);

    bool count(const char* counter_name, unsigned int duration);

    void run();

    int check();


};

static void * thread_run_report_check(void* arg)
{
    CDistManager * manager = (CDistManager*) arg;
    if(manager->m_running)
        return NULL;

    manager->m_running = true;

    int * thread_quit = manager->m_thread_quit;

    while(*thread_quit == 0)
    {
        int next_expiry = manager->check();
        if(next_expiry == -1)
            next_expiry = 60;

        sleep(next_expiry);
    }

    return NULL;
}

CDistManager::CDistManager(const char * report_path ):
m_thread_quit(NULL),
m_running(false)
{
    log_fp = stderr;
    if(report_path)
        m_report_path = report_path;
    else 
        m_report_path = "/tmp/duration_stats.txt";
    m_report_interval = 10* 60;

    pthread_mutex_init(&m_condition_mutex, NULL);
}

CDistManager::~CDistManager()
{

    // tell the thread to quit.
    // note: don't free the m_thread_quit as the 
    //       thread will refer it after this is deleted
    if(m_thread_quit) *m_thread_quit = 1;

    std::map<std::string,void *>::iterator it;

    for (it = m_counters.begin(); it != m_counters.end(); it++)
    {
        CDistCounter *config = (CDistCounter *) it->second;
        delete config;
    }

    m_counters.clear();

    pthread_mutex_destroy(&m_condition_mutex);
}

CDistCounter * CDistManager::find_counter(const char * name)
{
    std::map<std::string, void *>::iterator it;
    it = m_counters.find(name);
    if(it == m_counters.end())
    {
        return NULL;
    }
    else
    {
        return ((CDistCounter *) it->second);
    }
}

void CDistManager::add_counter(CDistCounter * counter, const char * name)
{
    if(name == NULL)
        name = counter->m_name.c_str();

    pthread_mutex_lock(&m_condition_mutex);
    CDistCounter * existing = find_counter(name);
    if(existing == NULL) m_counters[name] = counter;
    pthread_mutex_unlock(&m_condition_mutex);
}

bool CDistManager::count(const char* counter_name, unsigned int duration)
{
    CDistCounter * counter = find_counter(counter_name);
    if(!counter)
        return false;

    counter->counting(duration);
    return true;
}


void CDistManager::run()
{
    pthread_t tid;

    if(m_running)
        return;

    m_thread_quit = (int* )malloc(sizeof (int));
    if(m_thread_quit == NULL)
    {
        return;
    }

    *m_thread_quit = 0;
    if (pthread_create (&tid, NULL, thread_run_report_check, this))
    {
        fprintf( log_fp, "can't create thread_run_report_check :[%d]\n", (errno));
    }
}


/// check() is executed in another dedicated thread.
/// the lock must be used when the m_counters is changed

int CDistManager::check()
{
    int next_expiry = -1;
    std::map<std::string,void *>::iterator it;

    pthread_mutex_lock(&m_condition_mutex);
    for (it = m_counters.begin(); it != m_counters.end(); it++)
    {
        CDistCounter *counter = (CDistCounter *) it->second;
        int next;
        if(counter->check(&next, m_report_interval))
        {
            char * report = counter->get_report();
            FILE * fp = fopen(m_report_path.c_str(), "a");
            if(fp)
            {
                fputs(report, fp);
                fclose(fp);
            }
            else
            {
                fprintf(log_fp, "open report file (%s) failed. (%d)", 
                    m_report_path.c_str(), errno);
            }
            free(report);

            counter->reset();
            next = m_report_interval;
        }

        if(next_expiry == -1 || next_expiry < next)
            next_expiry = next;
    }

    pthread_mutex_unlock(&m_condition_mutex);

    return next_expiry;
}


/********************************************************************
 * 
 * 
 *  C Interfaces to expose
 * 
 * 
 * ******************************************************************/


duration_manager_t ws_get_default_count_mgr()
{
    static CDistManager * default_mgr = new CDistManager();
    return default_mgr;
}


duration_manager_t ws_create_duration_manager()
{
    return new CDistManager();
}

void ws_destory_duration_manager(duration_manager_t manager)
{
    delete (CDistManager*) manager;
}

void ws_start_duration_report(duration_manager_t manager, unsigned int report_seconds, const char * report_path)
{
    CDistManager* count_manager = (CDistManager*) manager;
    if(count_manager == NULL) count_manager = (CDistManager*) ws_get_default_count_mgr();
    count_manager->m_report_interval = report_seconds;
    if(report_path) count_manager->m_report_path = report_path;
    count_manager->run();
}



duration_counter_t ws_create_duration_counter2(duration_manager_t manager, const char * counter_name, unsigned int period, 
    unsigned int quick_count_base,
    unsigned int quick_count_size)
{
    CDistManager* count_manager = (CDistManager*) manager;
    if(count_manager == NULL) count_manager = (CDistManager*) ws_get_default_count_mgr();

    CDistCounter * counter = count_manager->find_counter(counter_name);
    if(counter)
        return counter;
    
    counter = new CDistCounter(counter_name, period, quick_count_base, quick_count_size);
    count_manager->add_counter(counter);

    return counter;
}

duration_counter_t ws_create_duration_counter(duration_manager_t manager, const char * counter_name, unsigned int period)
{
    CDistManager* count_manager = (CDistManager*) manager;
    if(count_manager == NULL) count_manager = (CDistManager*) ws_get_default_count_mgr();

    CDistCounter * counter = count_manager->find_counter(counter_name);
    if(counter)
        return counter;
    
    counter = new CDistCounter(counter_name, period);
    count_manager->add_counter(counter);

    return counter;
}

void ws_count_duration(duration_counter_t counter, unsigned int duration)
{
    CDistCounter * c = (CDistCounter*) counter;
    c->counting(duration);
}

void count_alarm_settings(duration_counter_t counter, unsigned int alarm_threshold,  
    bh_duration_alarm_handler alarm_handler)
{
    CDistCounter * c = (CDistCounter*) counter;
    c->m_alarm_threshold = alarm_threshold;
    c->m_alarm_hander = alarm_handler;
}

bool ws_count_duration2(duration_manager_t manager,
        const char * counter_name,
        unsigned int duration)
{
    CDistManager* count_manager = (CDistManager*) manager;
    if(count_manager == NULL) 
        count_manager = (CDistManager*) ws_get_default_count_mgr();

    return count_manager->count(counter_name, duration);
}

char * ws_counter_report(duration_counter_t counter)
{
    CDistCounter * c = (CDistCounter*) counter;
    return c->get_report();
}